package com.atguigu.champter5.Flink05.transformation.aggregation;

import com.atguigu.beans.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class $3_Redece {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        ArrayList<WaterSensor> waterSensors = new ArrayList<>();
        waterSensors.add(new WaterSensor("sensor_1", 1607527992000L, 50));
        waterSensors.add(new WaterSensor("sensor_1", 1607527994000L, 20));
        waterSensors.add(new WaterSensor("sensor_1", 1607527996000L, 60));
        waterSensors.add(new WaterSensor("sensor_2", 1607527993000L, 10));
        waterSensors.add(new WaterSensor("sensor_2", 1607527995000L, 30));

        DataStreamSource<WaterSensor> waterSensorDS = env.fromCollection(waterSensors);

        waterSensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                })
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        Integer value1Vc = value1.getVc();
                        Integer value2Vc = value2.getVc();
                        value2.setVc(value1Vc + value2Vc);
                        return value2;
                    }
                });

        System.out.println(env.getExecutionPlan());

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
